/* Payment Platform from Lucky Byte, Inc.
 *
 * Copyright (c) 2016 Lucky Byte, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.lucky_byte.pay.p351;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.luaj.vm2.LuaError;
import org.luaj.vm2.LuaValue;

import com.google.gson.JsonObject;
import com.lucky_byte.pay.jar.ByteUtils;
import com.lucky_byte.pay.jar.Jdbc;
import com.lucky_byte.pay.jar.JdbcRecord;
import com.lucky_byte.pay.jar.LuaInterp;
import com.lucky_byte.pay.jar.Runtime;
import com.lucky_byte.pay.jar.lualib.LuaJdbcRecord;
import com.lucky_byte.pay.jar.lualib.LuaRequest;
import com.lucky_byte.pay.jar.p351.Packet351;

/**
 * 处理客户端请求
 */
public class Worker implements Runnable
{
	private static final Logger logger = LogManager.getLogger();

	private Socket accept_socket;
	private JdbcRecord chnl;

	public Worker(Socket accept_socket, JdbcRecord chnl) {
		this.accept_socket = accept_socket;
		this.chnl = chnl;
	}

	/**
	 * 从网络输入流读取报文前缀长度
	 */
	private short readPacketSize(DataInputStream data_is) throws IOException {
		byte[] bytes = new byte[2];
		data_is.readFully(bytes);

		ByteBuffer byte_buff = ByteBuffer.wrap(bytes);
		byte_buff.order(ByteOrder.BIG_ENDIAN);
		short pkt_size = byte_buff.getShort();
		if (pkt_size <= 22 || pkt_size > 4096) {
			logger.error("客户端[{}][{}]发送的报文长度[{}]有误.",
					accept_socket.getInetAddress().getHostAddress(),
					accept_socket.getPort(), pkt_size);
			return -1;
		}
		logger.info("读取报文前缀长度[{}][{}].", pkt_size,
				ByteUtils.format(bytes));
		return pkt_size;
	}

	/**
	 * 从客户端读取请求报文
	 */
	private byte[] readPacket(Socket socket) throws IOException {
		DataInputStream data_is =
				new DataInputStream(socket.getInputStream());
		short pkt_size = this.readPacketSize(data_is);
		if (pkt_size < 0) {
			return null;
		}
		byte[] bytes = new byte[pkt_size];
		data_is.readFully(bytes);
		ByteUtils.log("接收客户端报文:", bytes);
		return bytes;
	}

	/** 
	 * 发送报文长度前缀到网络
	 */
	private void writePacketSize(DataOutputStream data_os, short pkt_size)
			throws IOException {
		ByteBuffer byte_buff = ByteBuffer.allocate(2);
		byte_buff.order(ByteOrder.BIG_ENDIAN);
		byte_buff.putShort(pkt_size);
		data_os.write(byte_buff.array());
		logger.trace("响应客户端报文长度[{}].", pkt_size);
	}

	/**
	 * 发送响应报文
	 */
	private void sendPacket(Socket socket, Packet351 respkt)
			throws IOException {
		byte[] resp_bytes;
		if (this.chnl.getBoolean("indirect")) {
			resp_bytes = respkt.packBytes();
			if (resp_bytes == null) {
				logger.error("重组响应报文字节流错误，发送响应报文失败");
				accept_socket.close();
				return;
			}
		} else {
			resp_bytes = respkt.inBytes();
			if (resp_bytes == null) {
//				String code = respkt.get("39");
//				if (code.equals("00")) {
//					logger.error("透传模式下不能手动构建成功响应报文.");
//					accept_socket.close();
//					return;
//				}
				resp_bytes = respkt.packBytes();
			}
		}
		DataOutputStream data_os = new DataOutputStream(
				accept_socket.getOutputStream());
		this.writePacketSize(data_os, (short) resp_bytes.length);
		data_os.write(resp_bytes);
		data_os.flush();
		ByteUtils.log("响应客户端报文:", resp_bytes);
	}

	/**
	 * 在单独的线程中执行的代码块
	 */
	public void run() {
		File main = Runtime.fileFor("/lua/main.lua");
		if (main == null || !main.exists()) {
			logger.fatal("文件[{}]不存在，安装不完整，不能提供服务.",
					main.getAbsolutePath());
			try {
				accept_socket.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
			return;
		}
		while (true) {
			try {
				byte[] bytes = this.readPacket(accept_socket);
				if (bytes == null) {
					logger.error("读取客户端[{}][{}]请求报文错误，放弃处理.",
							accept_socket.getInetAddress().getHostAddress(),
							accept_socket.getPort());
					break;
				}
				Packet351 packet = new Packet351();
				if (!packet.readBytes(bytes, -1)) {
					logger.error("解析客户端报文错误.");
					break;
				}
				Packet351 respkt = this.doScript(main, packet);
				// 如果脚本没有返回响应报文，则直接断开连接
				if (respkt == null) {
					break;
				}
				sendPacket(accept_socket, respkt);
			} catch (EOFException e) {
				logger.info("与客户端[{}][{}]通讯结束[客户端关闭连接].",
						accept_socket.getInetAddress().getHostAddress(),
						accept_socket.getPort());
				break;
			} catch (IOException e) {
				logger.error("与客户端[{}][{}]通讯错误[{}].",
						accept_socket.getInetAddress().getHostAddress(),
						accept_socket.getPort(), e.getMessage());
				break;
			}
		}
		logger.info("客户端[{}][{}]的请求处理完毕.",
				accept_socket.getInetAddress().getHostAddress(),
				accept_socket.getPort());
		try {
			if (!accept_socket.isClosed()) {
				accept_socket.close();
			}
		} catch (IOException e) {
			logger.catching(e);
		}
	}

	/**
	 * 执行脚本
	 */
	private Packet351 doScript(File script, Packet351 packet) {
		LuaInterp interp = new LuaInterp();
		LuaRequest reqt = new LuaRequest(packet);
		interp.load(reqt);
		interp.set("resp", LuaValue.tableOf());

		// 渠道信息
		JdbcRecord ichnl = chnl.clone();
		ichnl.setField("merch_chnl", 1);
		interp.set("reqt.ichnl", new LuaJdbcRecord(ichnl).call());

		// 通讯信息
		JsonObject netinfo = new JsonObject();
		netinfo.addProperty("s_addr",
				accept_socket.getLocalAddress().getHostAddress());
		netinfo.addProperty("s_port", accept_socket.getLocalPort());
		netinfo.addProperty("c_addr",
				accept_socket.getInetAddress().getHostAddress());
		netinfo.addProperty("c_port", accept_socket.getPort());
		interp.set("reqt.netinfo", netinfo.toString());

		// 加载交易系统提供的 Lua 模块
		interp.require("core.lib");

		// 加载特定与此渠道的功能模块，每个渠道不同
		interp.require("core.p351.reqt");
		interp.require("core.p351.resp");

		String uuid = reqt.getUUID().toString();
		logger.info("开始处理交易[{}].", uuid);

		// 执行脚本，脚本执行完后应该返回响应报文
		// 如果脚本执行失败，则返回 NULL，导致 POS 终端自动冲正
		LuaValue respkt = interp.eval(script);
		if (respkt.isnil()) {
			logger.error("交易[{}]处理失败[返回 nil].", uuid);
			this.doFailed(uuid, reqt.getUUIDNo());
			return null;
		}
		if (!respkt.istable() || respkt.get("_object").isnil()) {
			logger.error("交易[{}]处理失败[返回值 {} 不是有效的报文].", uuid,
					respkt.typename());
			this.doFailed(uuid, reqt.getUUIDNo());
			return null;
		}
		try {
			Object retpkt = respkt.get("_object").checkuserdata(Packet351.class);
			logger.info("交易[{}]处理结束，开始返回报文到客户端.", uuid);
			return (Packet351) retpkt;
		} catch (LuaError e) {
			logger.error("交易[{}]处理失败[返回值不是有效报文或报文类型不匹配].", uuid);
			this.doFailed(uuid, reqt.getUUIDNo());
			return null;
		}
	}

	/**
	 * 交易失败时，更新交易流水备注信息
	 */
	private void doFailed(String uuid, int uuid_no) {
		try {
			List<Object> params = new ArrayList<>();
			params.add("交易处理异常，未返回响应报文");
			params.add(uuid);
			params.add(Integer.toString(uuid_no));
			Jdbc.update("update pay_trlist set notes = ? "
					+ "where uuid = ? and uuid_no = ?", params);
		} catch (SQLException e) {
			logger.error("更新数据表错误[{}].", e.getMessage());
		}
	}

}
